package spark.structStreaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType

/**
  * Created by 71065 on 2017/11/7 0007.
  */
object StructedStreamingKafka {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("StructedStreamingKafka")
      .getOrCreate()

    //    spark.conf.set("spark.sql.streaming.checkpointLocation", "checkpoint")
    import spark.implicits._

    val deviceSchema = new StructType()
      .add("msgType", "string").add("did", "string").add("gid", "string")
      .add("ip", "string").add("thirdCloudId", "string").add("registered", "string")

    val df = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "cdhmanager:9092")
      .option("subscribe", "kafka_flume_hdfs1")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
    import org.apache.spark.sql.functions.get_json_object
    val words = df.select(get_json_object(($"value").cast("string"), "$.action").alias("actionCount"))
    //    val words = df.flatMap(_.split(" "))
    val wordCounts = words.groupBy("actionCount").count()

    wordCounts.show()
    val stream = wordCounts.writeStream
      //      .outputMode("complete")
      //      .format("json")
      //      .option("path", "result")
      //      .option("checkpointLocation", "checkpoint")
      //      .start()
      .outputMode("complete")
      .format("console")
      .trigger(Trigger.ProcessingTime("25 seconds"))
      .start()

    stream.awaitTermination()
  }
}

case class DeviceData1(msgType: String, did: String, gid: String, ip: String, thirdCloudId: String, registered: String)